iT邦幫忙

2023 iThome 鐵人賽

DAY 22
1
AI & Data

ML From Scratch系列 第 22

[Day 22] Autoencoder — 主題實作

  • 分享至 

  • xImage
  •  

昨天有簡單敘述一下 Autoencoder 的背後理論。

Autoencoder 是一種神經網絡架構,主要用於無監督學習和特徵提取。

它的主要目標是將輸入數據編碼成一種低維度的表示形式,然後再解碼回原始輸入數據。

這個過程有助於捕捉數據中的關鍵特徵,同時減少冗餘信息。

今天針對 Variational Autoencoder 來實做。

相較一般 Autoencoder,Variational Autoencoder 不僅學習壓縮表示,還學習了潛在空間的統計分佈。

Dataset

這裡使用的是 MNIST Dataset,一個手寫數字的資料集,裡面的每張圖片都是一個手寫數字圖片,並且標示對應的數字。

圖片的大小是https://chart.googleapis.com/chart?cht=tx&chl=28%20%5Ctimes%2028,顏色格式是灰階。

https://ithelp.ithome.com.tw/upload/images/20230922/20152821gyZcnEKwQY.png

Implementation

Encoder

import numpy as npy
from PIL import Image

try:
    import cupy as np
except ImportError:
    import numpy as np

from vae.utils.functionals import initialise_weight, initialise_bias
from vae.utils.functionals import relu, sigmoid


eps = 10e-8


class Encoder(object):
    def __init__(self, input_channels, layer_size, nz, 
                batch_size=64, lr=1e-3, beta1=0.9, beta2=0.999):
        """
        """
        self.input_channels = input_channels
        self.nz = nz

        self.batch_size = batch_size
        self.layer_size = layer_size

        # Initialise encoder weight
        self.W0 = initialise_weight(self.input_channels, self.layer_size)
        self.b0 = initialise_bias(self.layer_size)

        self.W_mu = initialise_weight(self.layer_size, self.nz)
        self.b_mu = initialise_bias(self.nz)

        self.W_logvar = initialise_weight(self.layer_size, self.nz)
        self.b_logvar = initialise_bias(self.nz)

        # Adam optimiser momentum and velocity
        self.lr = lr
        self.momentum = [0.0] * 6
        self.velocity = [0.0] * 6
        self.beta1 = beta1
        self.beta2 = beta2
        self.t = 0

    def forward(self, x):
        """
        """
        self.e_input = x.reshape((self.batch_size, -1))
        
        # Dimension check on input
        assert self.e_input.shape == (self.batch_size, self.input_channels)

        self.h0_l = self.e_input.dot(self.W0) + self.b0
        self.h0_a = relu(self.h0_l)

        self.logvar = self.h0_a.dot(self.W_logvar) + self.b_logvar
        self.mu = self.h0_a.dot(self.W_mu) + self.b_mu

        self.rand_sample = np.random.standard_normal(size=(self.batch_size, self.nz))
        self.sample_z = self.mu + np.exp(self.logvar * .5) * self.rand_sample

        return self.sample_z, self.mu, self.logvar

    def optimise(self, grads):
        """
        """
        # ---------------------------
        # Optimise using Adam
        # ---------------------------
        self.t += 1
        # Calculate gradient with momentum and velocity
        for i, grad in enumerate(grads):
            self.momentum[i] = self.beta1 * self.momentum[i] + (1 - self.beta1) * grad
            self.velocity[i] = self.beta2 * self.velocity[i] + (1 - self.beta2) * np.power(grad, 2)
            m_h = self.momentum[i] / (1 - (self.beta1 ** self.t))
            v_h = self.velocity[i] /  (1 - (self.beta2 ** self.t))
            grads[i] = m_h / np.sqrt(v_h + eps)

        grad_W0, grad_b0, grad_W_mu, grad_b_mu, grad_W_logvar, grad_b_logvar = grads

        # Update weights
        self.W0 = self.W0 - self.lr * np.sum(grad_W0, axis=0)
        self.b0 = self.b0 - self.lr * np.sum(grad_b0, axis=0)
        self.W_mu = self.W_mu - self.lr * np.sum(grad_W_mu, axis=0)
        self.b_mu = self.b_mu - self.lr * np.sum(grad_b_mu, axis=0)
        self.W_logvar = self.W_logvar - self.lr * np.sum(grad_W_logvar, axis=0)
        self.b_logvar = self.b_logvar - self.lr * np.sum(grad_b_logvar, axis=0)

        return

    def backward(self, x, grad_dec):
        """
        """
        # ----------------------------------------
        # Calculate gradients from reconstruction
        # ----------------------------------------
        y = np.reshape(x, (self.batch_size, -1))

        db_mu = grad_dec 
        dW_mu = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(grad_dec, axis=1))

        db_logvar = grad_dec * np.exp(self.logvar * .5) * .5 * self.rand_sample
        dW_logvar = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(db_logvar, axis=1))

        drelu = relu(self.h0_l, derivative=True)

        db0 = drelu * (db_mu.dot(self.W_mu.T) + db_logvar.dot(self.W_logvar.T))
        dW0 = np.matmul(np.expand_dims(y, axis=-1), np.expand_dims(db0, axis=1))

        # ----------------------------------------
        # Calculate gradients from K-L
        # ----------------------------------------
        # logvar terms
        dKL_b_logvar = .5 * (np.exp(self.logvar) - 1)
        dKL_W_logvar = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(dKL_b_logvar, axis=1))

        # mu terms
        dKL_b_mu = .5 * 2 * self.mu
        dKL_W_mu = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(dKL_b_mu, axis=1))

        dKL_b0 = drelu * (dKL_b_logvar.dot(self.W_logvar.T) + dKL_b_mu.dot(self.W_mu.T))
        dKL_W0 = np.matmul(np.expand_dims(y, axis=-1), np.expand_dims(dKL_b0, axis=1))

        # Combine gradients for encoder from recon and KL
        grad_b_logvar = dKL_b_logvar + db_logvar
        grad_W_logvar = dKL_W_logvar + dW_logvar
        grad_b_mu = dKL_b_mu + db_mu
        grad_W_mu = dKL_W_mu + dW_mu
        grad_b0 = dKL_b0 + db0
        grad_W0 = dKL_W0 + dW0

        grads = [grad_W0, grad_b0, grad_W_mu, grad_b_mu, grad_W_logvar, grad_b_logvar]

        # Optimise step
        self.optimise(grads)

        return

cupy 是一個能夠在 GPU 上執行的 numpy 替代品,用於加速計算

__init__(self, input_channels, layer_size, nz, batch_size, lr, beta1, beta2)

初始化了編碼器的各種參數,包括

  • 輸入通道數(input_channels)
  • 編碼層大小(layer_size)
  • 潛在空間的維度(nz)
  • 權重(W)
  • 偏差(b)
  • Adam優化器

forward(self, x)

這裡實做前向傳播過程。

它接受輸入 x,對其進行編碼並計算出潛在變數(latent variables) sample_z、均值(mu)和對數變異數(logvar)。

optimise(self, grads)

優化器的優化步驟,使用Adam優化算法來更新權重和偏差。

backward(self, x, grad_dec)

實現了反向傳播過程,計算了從重建損失(reconstruction loss)和KL散度(Kullback-Leibler divergence)中獲得的梯度,然後使用 optimise 方法來優化編碼器的權重和偏差。

Decoder

import numpy as npy
from PIL import Image

try:
    import cupy as np
except ImportError:
    import numpy as np

from vae.utils.functionals import initialise_weight, initialise_bias
from vae.utils.functionals import MSELoss
from vae.utils.functionals import relu, sigmoid


eps = 10e-8


class Decoder(object):
    def __init__(self, input_channels, layer_size, nz, 
                batch_size=64, lr=1e-3, beta1=0.9, beta2=0.999):
        
        self.input_channels = input_channels
        self.nz = nz

        self.batch_size = batch_size
        self.layer_size = layer_size

        # Initialise decoder weight
        self.W0 = initialise_weight(self.nz, self.layer_size)
        self.b0 = initialise_bias(self.layer_size)

        self.W1 = initialise_weight(self.layer_size, self.input_channels)
        self.b1 = initialise_bias(self.input_channels)

        # Adam optimiser momentum and velocity
        self.lr = lr
        self.momentum = [0.0] * 4
        self.velocity = [0.0] * 4
        self.beta1 = beta1
        self.beta2 = beta2
        self.t = 0

    def forward(self, z):
        self.z = np.reshape(z, (self.batch_size, self.nz))

        self.h0_l = self.z.dot(self.W0) + self.b0
        self.h0_a = relu(self.h0_l)

        self.h1_l = self.h0_l.dot(self.W1) + self.b1
        self.h1_a = sigmoid(self.h1_l)

        self.d_out = np.reshape(self.h1_a, (self.batch_size, self.input_channels))

        return self.d_out

    def optimise(self, grads):
        """
        """
        # ---------------------------
        # Optimise using Adam
        # ---------------------------
        self.t += 1
        # Calculate gradient with momentum and velocity
        for i, grad in enumerate(grads):
            self.momentum[i] = (1 - self.beta1) * grad
            self.velocity[i] = self.beta2 * self.velocity[i] + (1 - self.beta2) * np.power(grad, 2)
            m_h = self.momentum[i] / (1 - (self.beta1 ** self.t))
            v_h = self.velocity[i] /  (1 - (self.beta2 ** self.t))
            grads[i] = m_h / np.sqrt(v_h + eps)

        grad_dW0, grad_db0, grad_dW1, grad_db1 = grads

        # Update weights
        self.W0 = self.W0 - self.lr * np.sum(grad_dW0, axis=0)
        self.b0 = self.b0 - self.lr * np.sum(grad_db0, axis=0)
        self.W1 = self.W1 - self.lr * np.sum(grad_dW1, axis=0)
        self.b1 = self.b1 - self.lr * np.sum(grad_db1, axis=0)

        return

    def backward(self, x, out):
        # ----------------------------------------
        # Calculate gradients from reconstruction
        # ----------------------------------------
        y = np.reshape(x, (self.batch_size, -1))
        out = np.reshape(out, (self.batch_size, -1))

        dL = MSELoss(out, y, derivative=True)
        dSig = sigmoid(self.h1_l, derivative=True)

        dL_dSig = dL * dSig

        grad_db1 = dL_dSig
        grad_dW1 = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(dL_dSig, axis=1))
        
        drelu0 = relu(self.h0_l, derivative=True)

        grad_db0 = grad_db1.dot(self.W1.T) * drelu0
        grad_dW0 = np.matmul(np.expand_dims(self.z, axis=-1), np.expand_dims(grad_db0, axis=1))

        # output gradient to the encoder layer
        grad_dec = grad_db0.dot(self.W0.T)

        grads = [grad_dW0, grad_db0, grad_dW1, grad_db1]

        # Optimiser Step
        self.optimise(grads)

        return grad_dec

這裡實現了一個解碼器,用於將潛在變數轉換為生成的數據點,並使用Adam優化器來學習權重。

解碼器有兩個隱藏層,分別使用ReLU和Sigmoid激活函數。

在反向傳播中,計算了梯度,然後使用Adam優化器來更新權重。

VAE

import numpy as npy
from PIL import Image

try:
    import cupy as np
except ImportError:
    import numpy as np

from .encoder_model import Encoder
from .decoder_model import Decoder

eps = 10e-8


class VariationalAutoEncoder(object):

    def __init__(self, input_channels, layer_size, nz, 
                batch_size=64, lr=1e-3, beta1=0.9, beta2=0.999):
        
        self.input_channels = input_channels
        self.nz = nz

        self.batch_size = batch_size
        self.layer_size = layer_size

        # Construct encoder module
        self.encoder = Encoder(input_channels, layer_size, nz, 
            batch_size=batch_size, lr=lr, beta1=beta1, beta2=beta2)
        
        # Construct decoder module
        self.decoder = Decoder(input_channels, layer_size, nz,
            batch_size=batch_size, lr=lr, beta1=beta1, beta2=beta2)
        
    def forward(self, x):
        """
        """
        x = x.reshape((self.batch_size, -1))

        # Feed forward encoder - decoder
        sample_z, mu, logvar = self.encoder.forward(x)
        out = self.decoder.forward(sample_z)

        return out, mu, logvar

    def backward(self, x, out):
        """
        """
        grad_dec = self.decoder.backward(x, out)
        self.encoder.backward(x, grad_dec)

from .encoder_model import Encoderfrom .decoder_model import Decoder 引入了自定義的 EncoderDecoder 類別。

eps = 10e-8:定義了一個非常小的數值,避免除以零或其他數值積分的不穩定情況。

__init__(self, input_channels, layer_size, nz, batch_size, lr, beta1, beta2)

初始化 VAE 參數,包括:

  • 輸入通道數 (input_channels)
  • 隱變數的維度 (nz)
  • 批次大小 (batch_size)
  • 學習率 (lr)
  • beta1、beta2 這兩個優化器的超參數。

在初始化過程中,建立了 EncoderDecoder 的 module,這些模型將用於 VAE 的訓練和生成。

forward(self, x)

是 VAE 的前向傳播函式。

它接受輸入 x,將其重塑為適當的形狀,然後通過 Encoder 模型將輸入壓縮為隱變數 sample_z、平均值 mu 和對數方差 logvar

接著,它將 sample_z 通過 Decoder 模型解碼,並返回生成的輸出、平均值和對數方差。

backward(self, x, out)

是 VAE 的反向傳播函式。

它接受輸入 x 和前向傳播的輸出 out,然後使用 Decoder 模型的反向傳播來計算梯度。

接著,它使用這些梯度來更新 Encoder 模型的權重,以便進行訓練。


明天,我們會透過 Kaggle competition 來使用 Variational Autoencoder 解決 Labelled Faces in the Wild (LFW) Dataset 所碰到的議題!!!

/images/emoticon/emoticon37.gif

Reference


上一篇
[Day 21] Autoencoder — 背後理論
下一篇
[Day 23] Autoencoder — 解決真實問題
系列文
ML From Scratch31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言